[Bigtable Sink] Add Extract Timestamp SMT#201
Conversation
There was a problem hiding this comment.
Code Review
This pull request introduces a new ExtractTimestamp Single Message Transformation (SMT) for extracting record timestamps from nested fields and refactors the KeyMapper to utilize a new SchemaParsingUtils utility. Review feedback highlights a syntax error in FlattenArrayElementTest, suggests renaming misleading variables for clarity, and recommends using DataException instead of IllegalArgumentException to align with Kafka Connect standards. Additionally, improvements were suggested for configuration descriptions and handling potential precision loss when parsing string-based timestamps.
I am having trouble creating individual review comments. Click here to see my feedback.
kafka-connect-bigtable-sink/sink/src/test/java/com/google/cloud/kafka/connect/bigtable/transformations/FlattenArrayElementTest.java (381)
A syntax error was introduced here (gi.toList()). This will cause a compilation failure.
Arrays.stream(new Struct[] {null, productElement2, productElement3}).toList();
kafka-connect-bigtable-sink/sink/src/main/java/com/google/cloud/kafka/connect/bigtable/transformations/ExtractTimestamp.java (49)
The configuration description contains a 'TODO'. Please provide a meaningful description for the timestamp format configuration.
"The format of the timestamp field (e.g., SECONDS, MILLIS, MICROS, NANOS).");
kafka-connect-bigtable-sink/sink/src/main/java/com/google/cloud/kafka/connect/bigtable/transformations/ExtractTimestamp.java (52)
The variable name elementWrapperFieldName is misleading as it stores an enum representing the timestamp format, not a field name. Consider renaming it to timestampFormat.
private ExtractTimestampFormat timestampFormat;
kafka-connect-bigtable-sink/sink/src/main/java/com/google/cloud/kafka/connect/bigtable/transformations/ExtractTimestamp.java (58)
Parsing the enum directly from the config string using valueOf is case-sensitive and will throw an IllegalArgumentException if the value is invalid. It's better to use toUpperCase() for robustness and consider adding a validator to the ConfigDef.
this.timestampFormat = ExtractTimestampFormat.valueOf(config.getString(TIMESTAMP_FORMAT_TYPE).toUpperCase(java.util.Locale.ROOT));
kafka-connect-bigtable-sink/sink/src/main/java/com/google/cloud/kafka/connect/bigtable/transformations/ExtractTimestamp.java (64)
Update usage of renamed variable timestampFormat.
long parsedTimestampMillis = ExtractTimestamp.parseTimestampToMillis(timestampField, timestampFormat);
kafka-connect-bigtable-sink/sink/src/main/java/com/google/cloud/kafka/connect/bigtable/transformations/ExtractTimestamp.java (90)
In Kafka Connect transformations, DataException is typically preferred over IllegalArgumentException for errors encountered during record processing.
throw new DataException("Cannot parse timestamp value of null");
kafka-connect-bigtable-sink/sink/src/main/java/com/google/cloud/kafka/connect/bigtable/transformations/ExtractTimestamp.java (113)
Casting the result of Double.parseDouble directly to long truncates the fractional part. If the input string represents seconds (e.g., "1715698738.123"), this results in a loss of precision (milliseconds) before the resolution logic is applied. Consider parsing as a double and multiplying by the appropriate factor (e.g., 1000 for seconds) before converting to long.
kafka-connect-bigtable-sink/sink/src/main/java/com/google/cloud/kafka/connect/bigtable/transformations/ExtractTimestamp.java (116-117)
Use DataException instead of IllegalArgumentException for consistency with Kafka Connect standards.
throw new DataException(
"Unsupported timestamp payload type: " + rawValue.getClass().getName());
kafka-connect-bigtable-sink/sink/src/main/java/com/google/cloud/kafka/connect/bigtable/util/SchemaParsingUtils.java (59)
This utility is now used by both KeyMapper and ExtractTimestamp. The error message 'Error with row key definition' is too specific and will be misleading when used in the context of timestamp extraction. Please use a more generic message.
throw new DataException("Extracted field value cannot be null.");
|
/gemini review |
There was a problem hiding this comment.
Code Review
This pull request introduces the ExtractTimestamp SMT to enable extracting cell timestamps from Kafka message keys or values, supported by new utility classes ConfigUtils and SchemaParsingUtils. Existing components, including KeyMapper and BigtableSinkConfig, were refactored to use these utilities, and the README.md was updated with relevant documentation. Review feedback identifies a case-sensitivity bug in configuration parsing, precision loss when handling floating-point timestamp strings, and the need for improved variable naming and import practices. Minor documentation typos and grammar issues were also noted.
1d7a464 to
03f4a26
Compare
annguy3n
left a comment
There was a problem hiding this comment.
Should we add some use cases/examples for this in documentation?
| * @param index Index of the field that is being extracted. | ||
| * @return Extracted nested field. | ||
| */ | ||
| private static SchemaAndValue extractField( |
There was a problem hiding this comment.
how would this handle nested fields?
There was a problem hiding this comment.
The fields array is a path to a (potentially nested) field, where each array element is a field and the following element is a child field. extractField(...) will recursively iterate over the provided fields array until it finds the inner most field and returns that field's schema and value. I'll add this to this method's doc
Added to the readme |
999a7ea to
57503c9
Compare
Adds a new SMT for extracting a timestamp from a message value or key to use as the message timestamp